#!/usr/bin/python3
# TLP Profiles Daemon (tlp-pd)
# tlp-pd implements the D-Bus interface org.freedesktop.UPower.PowerProfile
# which lets desktop environments show a power profile switch.
# TLP is used as the backend to apply these profiles.
#
# Copyright (c) 2025 Thomas Koch <linrunner at gmx.net> and others.
# SPDX-License-Identifier: GPL-2.0-or-later

import dbus
import dbus.service
import dbus.mainloop.glib
from gi.repository import GLib

import argparse
import inspect
import logging
import logging.handlers
import os
import signal
import string
import subprocess
import sys
from syslog import LOG_DAEMON
import xml.etree.ElementTree as Xtree

# --- Constants
DAEMON_NAME = "TLP Profiles Daemon"
AVAILABLE_PROFILES = ["performance", "balanced", "power-saver"]

# --- D-Bus constants
BUS_NAME = "org.freedesktop.UPower.PowerProfiles"
INTERFACE_NAME = BUS_NAME
OBJECT_PATH = "/org/freedesktop/UPower/PowerProfiles"
BUS_NAME_LEGACY = "net.hadess.PowerProfiles"
INTERFACE_NAME_LEGACY = BUS_NAME_LEGACY
OBJECT_PATH_LEGACY = "/net/hadess/PowerProfiles"

# --- TLP constants
TLP_LAST_PROFILE = "@TLP_RUN@/last_pwr"
TLP_PROFILE_IDX = ["0", "1", "2"]

# --- Introspection
# Problem: the imported module 'dbus' does not support property decorations,
# thus property declarations are missing from the XML result of
# 'dbus.service.Object.Introspect()', rendering the introspection incomplete.
# Workaround: insert the missing declarations before returning.

# Only for reference: introspection XML as generated by 'dbus'
INTROSPECTION_FULL_XML = """
<!DOCTYPE node PUBLIC "-//freedesktop//DTD D-BUS Object Introspection 1.0//EN"
"http://www.freedesktop.org/standards/dbus/1.0/introspect.dtd">
<node>
  <interface name="org.freedesktop.DBus.Properties">
    <method name="Get">
      <arg type="s" name="interface_name" direction="in"/>
      <arg type="s" name="property_name" direction="in"/>
      <arg type="v" name="value" direction="out"/>
    </method>
    <method name="GetAll">
      <arg type="s" name="interface_name" direction="in"/>
      <arg type="a{sv}" name="properties" direction="out"/>
    </method>
    <method name="Set">
      <arg type="s" name="interface_name" direction="in"/>
      <arg type="s" name="property_name" direction="in"/>
      <arg type="v" name="value" direction="in"/>
    </method>
    <signal name="PropertiesChanged">
      <arg type="s" name="interface_name"/>
      <arg type="a{sv}" name="changed_properties"/>
      <arg type="as" name="invalidated_properties"/>
    </signal>
  </interface>
  <interface name="org.freedesktop.DBus.Introspectable">
    <method name="Introspect">
      <arg type="s" name="xml_data" direction="out"/>
    </method>
  </interface>
  <interface name="{INTERFACE_NAME}">
    <method name="HoldProfile">
      <arg type="s" name="profile" direction="in"/>
      <arg type="s" name="reason" direction="in"/>
      <arg type="s" name="application_id" direction="in"/>
      <arg type="u" name="cookie" direction="out"/>
    </method>
    <method name="ReleaseProfile">
      <arg type="u" name="cookie" direction="in"/>
    </method>
    <method name="SetActionEnabled">
        <arg name="action" type="s" direction="in"/>
        <arg name="enabled" type="b" direction="in"/>
    </method>
    <method name="SyncProfile">
      <arg type="s" name="profile" direction="in"/>
    </method>
    <signal name="ProfileReleased">
      <arg type="u" name="cookie"/>
    </signal>
    <!-- PROPERTIES INSERTED HERE -->
  </interface>
</node>
"""

# Property declarations to insert
INTROSPECTION_PROPERTIES_XML = """
    <property type="s" name="ActiveProfile" access="readwrite"/>
    <property type="s" name="PerformanceInhibited" access="read"/>
    <property type="s" name="PerformanceDegraded" access="read"/>
    <property type="aa{sv}" name="Profiles" access="read"/>
    <property type="as" name="Actions" access="read"/>
    <property type="aa{sv}" name="ActionsInfo" access="read"/>
    <property type="aa{sv}" name="ActiveProfileHolds" access="read"/>
    <property type="b" name="BatteryAware" access="readwrite"/>
    <property type="s" name="LogLevel" access="readwrite"/>
    <property type="s" name="Version" access="read"/>
"""


# --- D-Bus service implementing the power-profiles-daemon API
class ProfilesDaemon(dbus.service.Object):
    def __init__(self, bus, logger, loglevel):
        dbus.service.Object.__init__(self, bus, OBJECT_PATH)
        # Export on the legacy interface net.hadess.PowerProfiles and legacy path too
        self.SUPPORTS_MULTIPLE_OBJECT_PATHS = True
        dbus.service.Object.add_to_connection(self, bus, OBJECT_PATH_LEGACY)
        # Remember bus for _check_polkit_auth()
        self._bus = bus
        self._logger = logger
        self._loglevel = loglevel
        self._ifaces_seen = set()

        # Internal state
        self._active_profile = self._get_tlp_profile()  # Last profile applied by TLP
        self._selected_profile = (
            self._active_profile
        )  # Last profile received by Set(ActiveProfile)
        self._profiles = [
            {"Profile": f"{profile}", "CpuDriver": "tlp", "PlatformDriver": "tlp"}
            for profile in AVAILABLE_PROFILES
        ]
        self._performance_degraded = ""
        self._actions = []
        self._actions_info = []
        self._version = "tlp-pd @TLPVER@"
        self._battery_aware = False
        self._cookie_counter = 0
        self._holds = {}  # cookie -> {profile, reason, application_id, sender}

    # --- Interfaces org.freedesktop.UPower.PowerProfiles and net.hadess.PowerProfiles (legacy)

    @dbus.service.method(
        dbus_interface=INTERFACE_NAME,
        in_signature="sss",
        out_signature="u",
        sender_keyword="sender",
    )
    @dbus.service.method(
        dbus_interface=INTERFACE_NAME_LEGACY,
        in_signature="sss",
        out_signature="u",
        sender_keyword="sender",
    )
    def HoldProfile(self, profile, reason, application_id, sender=None):
        # Hold a new profile until:
        # - either it is released by ReleaseProfile()
        # - or another one is selected by the user via Set(ActiveProfile)
        profile = _sanitize_input(profile)
        reason = _sanitize_input(reason)
        application_id = _sanitize_input(application_id)
        sender = _sanitize_input(sender)
        _method = _method_name()

        if profile not in AVAILABLE_PROFILES:
            raise dbus.exceptions.DBusException(
                f"{_method}(): Invalid profile '{profile}'",
                name="org.freedesktop.DBus.Error.InvalidArgs",
            )

        # Check authorization first
        if not self._check_polkit_auth(sender, f"{BUS_NAME}.hold-profile"):
            raise dbus.exceptions.DBusException(
                f"{_method}(): Not authorized to hold power profile",
                name="org.freedesktop.PolicyKit1.Error.NotAuthorized",
            )

        self._cookie_counter += 1
        cookie = self._cookie_counter

        # Store hold
        self._holds[cookie] = {
            "profile": profile,
            "reason": reason,
            "application_id": application_id,
            "sender": sender,
        }

        self._logger.debug(
            f"{_method}(profile='{profile}', reason='{reason}', appid='{application_id}', sender='{sender}'): cookie: '{cookie}'"
        )

        # Update active profile based on holds
        self._update_profile_from_holds()

        return dbus.UInt32(cookie)

    @dbus.service.method(
        dbus_interface=INTERFACE_NAME,
        in_signature="u",
        out_signature="",
        sender_keyword="sender",
    )
    @dbus.service.method(
        dbus_interface=INTERFACE_NAME_LEGACY,
        in_signature="u",
        out_signature="",
        sender_keyword="sender",
    )
    def ReleaseProfile(self, cookie, sender=None):
        # Release a profile hold and revert to:
        # - either the previous held profile
        # - or the last profile selected by the user
        cookie = int(cookie)
        sender = _sanitize_input(sender)
        _method = _method_name()

        if cookie not in self._holds:
            raise dbus.exceptions.DBusException(
                f"{_method}(): Invalid cookie '{cookie}'",
                name="org.freedesktop.DBus.Error.InvalidArgs",
            )

        hold_rel = self._holds.pop(cookie)
        self._logger.debug(
            f"{_method}(cookie='{cookie}', sender='{sender}'): hold: '{hold_rel['profile']}', appid: '{hold_rel['application_id']}'"
        )

        # Update active profile based on remaining holds
        self._update_profile_from_holds()

    @dbus.service.method(
        dbus_interface=INTERFACE_NAME, in_signature="sb", out_signature=""
    )
    @dbus.service.method(
        dbus_interface=INTERFACE_NAME_LEGACY,
        in_signature="sb",
        out_signature="",
    )
    def SetActionEnabled(self, action, enabled):
        # Set a particular action to be enabled or disabled
        # Not yet implemented by tlp-pd
        action = _sanitize_input(action)
        _method = _method_name()

        raise dbus.exceptions.DBusException(
            f"{_method}(): No such action '{action}'",
            name="org.freedesktop.DBus.Error.InvalidArgs",
        )

    @dbus.service.method(
        dbus_interface=INTERFACE_NAME,
        in_signature="s",
        out_signature="",
        sender_keyword="sender",
    )
    def SyncProfile(self, profile, sender=None):
        # Callback for tlp after profile changes: update internal state only
        profile = _sanitize_input(profile)
        sender = _sanitize_input(sender)
        _method = _method_name()
        self._logger.debug(f"{_method}(profile='{profile}', sender='{sender}')")

        if profile not in AVAILABLE_PROFILES:
            raise dbus.exceptions.DBusException(
                f"{_method}(): Invalid profile '{profile}'",
                name="org.freedesktop.DBus.Error.InvalidArgs",
            )

        # Update active profile
        self._active_profile = profile

        # Signal the desktop about the new profile
        # on all interfaces that received Get(), GetAll() or Set()
        # Note: this is necessary for installations consisting of different
        # components: one that addresses the modern interface (UPower) and
        # one that addresses the legacy interface (e.g., Cinnamon desktop).
        for iface in self._ifaces_seen:
            self.PropertiesChanged(iface, {"ActiveProfile": profile}, [])

    @dbus.service.signal(dbus_interface=INTERFACE_NAME, signature="u")
    # Exported on the interface org.freedesktop.UPower.PowerProfiles
    # Emitted when a profile hold is released
    ### TODO: Signal is not emitted
    def ProfileReleased(self, cookie):
        _method = _method_name()
        self._logger.debug(f"Signal {_method}()")
        pass

    # --- Interface org.freedesktop.DBus.Peer
    # Note: for reasons still unknown, the parent class methods are called directly.
    # Therefore, the declarations for this interface only provide introspection.

    @dbus.service.method(
        dbus_interface=dbus.PEER_IFACE, in_signature="", out_signature=""
    )
    def Ping(self):
        # Call parent implementation
        return super().Ping()

    @dbus.service.method(
        dbus_interface=dbus.PEER_IFACE, in_signature="", out_signature="s"
    )
    def GetMachineId(self):
        # Call parent implementation
        return super().GetMachineId()

    # --- Interface org.freedesktop.DBus.Introspectable

    @dbus.service.method(
        dbus_interface=dbus.INTROSPECTABLE_IFACE,
        in_signature="",
        out_signature="s",
        path_keyword="object_path",
        connection_keyword="connection",
    )
    def Introspect(self, object_path, connection):
        # Return introspection XML describing this service
        object_path = _sanitize_input(object_path)
        # Note: connection is not a string, do *not* sanitize
        _method = _method_name()
        self._logger.debug(
            f"{_method}(object_path='{object_path}', connection='{connection}')"
        )

        # Get 'dbus' introspection XML in a string
        intro_xml_str = super().Introspect(object_path, connection)
        iface = INTERFACE_NAME
        if object_path == OBJECT_PATH_LEGACY:
            # Inject legacy interface name
            intro_xml_str = intro_xml_str.replace(INTERFACE_NAME, INTERFACE_NAME_LEGACY)
            iface = INTERFACE_NAME_LEGACY
        elif object_path != OBJECT_PATH:
            raise dbus.exceptions.DBusException(
                f"{_method}(): No such object path '{object_path}'",
                name="org.freedesktop.DBus.Error.UnknownObject",
            )

        # Parse string to XML
        xml_root = Xtree.fromstring(intro_xml_str)
        # Find the <interface name="iface"> element
        xml_interface = xml_root.find(f".//interface[@name='{iface}']")
        # Parse properties-to-be-inserted to XML
        xml_properties = Xtree.fromstring(
            f"<root>{INTROSPECTION_PROPERTIES_XML}</root>"
        )
        # Append properties to the interface element
        for prop in xml_properties:
            xml_interface.append(prop)  # pyright: ignore[reportOptionalMemberAccess]
        # Return XML as a string
        return Xtree.tostring(xml_root, encoding="unicode")

    # --- Interface org.freedesktop.DBus.Properties
    # Note: this interface section must be the last one, or the primitive Introspect()
    # property XML insertion will not work!

    @dbus.service.method(
        dbus_interface=dbus.PROPERTIES_IFACE,
        in_signature="ss",
        out_signature="v",
        sender_keyword="sender",
    )
    def Get(self, interface_name, property_name, sender=None):
        # Get a property value
        interface_name = _sanitize_input(interface_name)
        property_name = _sanitize_input(property_name)
        _method = _method_name()

        if interface_name not in [INTERFACE_NAME, INTERFACE_NAME_LEGACY]:
            raise dbus.exceptions.DBusException(
                f"{_method}(): Unknown interface '{interface_name}'",
                name="org.freedesktop.DBus.Error.UnknownInterface",
            )

        self._logger.debug(
            f"{_method}(interface='{interface_name}', property='{property_name}', sender='{sender}')"
        )
        # Remember receiving interface for later use with PropertiesChanged signal
        self._ifaces_seen.add(interface_name)

        return self._get_property(property_name)

    @dbus.service.method(
        dbus_interface=dbus.PROPERTIES_IFACE,
        in_signature="s",
        out_signature="a{sv}",
        sender_keyword="sender",
    )
    def GetAll(self, interface_name, sender=None):
        # Get all properties
        interface_name = _sanitize_input(interface_name)
        _method = _method_name()

        if interface_name not in [INTERFACE_NAME, INTERFACE_NAME_LEGACY]:
            raise dbus.exceptions.DBusException(
                f"{_method}(): Unknown interface '{interface_name}'",
                name="org.freedesktop.DBus.Error.UnknownInterface",
            )

        self._logger.debug(
            f"{_method}(interface='{interface_name}', sender='{sender}')"
        )
        # Remember receiving interface for later use with PropertiesChanged signal
        self._ifaces_seen.add(interface_name)

        return {
            f"{prop}": self._get_property(prop)
            for prop in [
                "ActiveProfile",
                "PerformanceInhibited",
                "PerformanceDegraded",
                "Profiles",
                "Actions",
                "ActionsInfo",
                "ActiveProfileHolds",
                "BatteryAware",
                "LogLevel",
                "Version",
            ]
        }

    @dbus.service.method(
        dbus_interface=dbus.PROPERTIES_IFACE,
        in_signature="ssv",
        out_signature="",
        sender_keyword="sender",
    )
    def Set(self, interface_name, property_name, value, sender):
        # Set a property value
        interface_name = _sanitize_input(interface_name)
        property_name = _sanitize_input(property_name)
        if property_name != "BatteryAware":
            value = _sanitize_input(value)
        sender = _sanitize_input(sender)
        _method = _method_name()

        if interface_name not in [INTERFACE_NAME, INTERFACE_NAME_LEGACY]:
            raise dbus.exceptions.DBusException(
                f"{_method}(interface='{interface_name}')",
                name="org.freedesktop.DBus.Error.UnknownInterface",
            )

        self._logger.debug(
            f"{_method}(interface='{interface_name}', property='{property_name}', value='{value}', sender='{sender}')"
        )
        # Remember receiving interface for later use with PropertiesChanged signal
        self._ifaces_seen.add(interface_name)

        self._set_property(property_name, value, sender)

    @dbus.service.signal(dbus_interface=dbus.PROPERTIES_IFACE, signature="sa{sv}as")
    # Exported on the interface org.freedesktop.DBus.Properties == sender interface
    # Used when a property changes, e.g. ActiveProfile
    # Note: the dbus-python method sends *two* signals with both registered object paths
    # OBJECT_PATH and OBJECT_PATH_LEGACY. Not ideal, but dbus-python can't do better.

    def PropertiesChanged(
        self, interface_name, changed_properties, invalidated_properties
    ):
        # Note: can only be accessed internally; no sanitization required
        _method = f"Signal {_method_name()}"
        self._logger.debug(
            f"{_method}(interface='{interface_name}', changed_properties='{changed_properties}', invalidated_properties='{invalidated_properties}')"
        )
        pass

    # --- Helpers
    def _get_property(self, property_name):
        # Internal method to get property values
        if property_name == "ActiveProfile":
            return self._active_profile
        elif property_name == "PerformanceInhibited":
            return ""  # Deprecated
        elif property_name == "PerformanceDegraded":
            return self._performance_degraded
        elif property_name == "Profiles":
            return dbus.Array(self._profiles, signature="a{sv}")
        elif property_name == "Actions":
            return dbus.Array(self._actions, signature="s")
        elif property_name == "ActionsInfo":
            return dbus.Array(self._actions_info, signature="aa{sv}")
        elif property_name == "ActiveProfileHolds":
            return self._get_active_profile_holds()
        elif property_name == "Version":
            return self._version
        elif property_name == "BatteryAware":
            return self._battery_aware
        elif property_name == "LogLevel":
            return self._loglevel
        else:
            _method = _method_name()
            raise dbus.exceptions.DBusException(
                f"{_method}(): Unknown property '{property_name}'",
                name="org.freedesktop.DBus.Error.UnknownProperty",
            )

    def _set_property(self, property_name, value, sender):
        # Internal method to set property values
        _method = _method_name()
        self._logger.debug(
            f"{_method}(property_name= '{property_name}', value='{value}')"
        )

        if property_name == "ActiveProfile":
            # Check authorization first
            if not self._check_polkit_auth(sender, f"{BUS_NAME}.switch-profile"):
                raise dbus.exceptions.DBusException(
                    f"{_method}(): Not authorized to switch power profile",
                    name="org.freedesktop.PolicyKit1.Error.NotAuthorized",
                )

            # User (manually) changes profile
            self._drop_all_holds()
            self._apply_profile_with_tlp(str(value))
            self._selected_profile = str(value)

        elif property_name == "BatteryAware":
            # Check authorization first
            if not self._check_polkit_auth(
                sender, f"{BUS_NAME}.configure-battery-aware"
            ):
                raise dbus.exceptions.DBusException(
                    f"{_method}(): Not authorized to configure battery aware",
                    name="org.freedesktop.PolicyKit1.Error.NotAuthorized",
                )

            if str(value) == "1":
                self._battery_aware = True
            else:
                self._battery_aware = False

        elif property_name == "LogLevel":
            # Check authorization first
            if not self._check_polkit_auth(sender, f"{BUS_NAME}.set-loglevel"):
                raise dbus.exceptions.DBusException(
                    f"{_method}(): Not authorized to set loglevel",
                    name="org.freedesktop.PolicyKit1.Error.NotAuthorized",
                )

            if str(value) == "debug":
                self._logger.setLevel(logging.DEBUG)
                self._loglevel = "debug"
            else:
                self._logger.setLevel(logging.INFO)
                self._loglevel = "info"

        else:
            raise dbus.exceptions.DBusException(
                f"{_method}(): Property '{property_name}' is readonly",
                name="org.freedesktop.DBus.Error.PropertyReadOnly",
            )

        # Signal the desktop about changed properties
        # on all interfaces that received Get(), GetAll() or Set(),
        # except ActiveProfile which is done in SyncProfile()
        if property_name != "ActiveProfile":
            for iface in self._ifaces_seen:
                self.PropertiesChanged(iface, {f"{property_name}": value}, [])

    def _update_profile_from_holds(self):
        # Change active profile to the last hold (highest cookie)
        # If there are no holds remaining, revert to the last user selected profile
        if len(self._holds) == 0:
            self._logger.info(
                f"Changing active profile: from hold '{self._active_profile}' to last user selection '{self._selected_profile}'"
            )
            self._apply_profile_with_tlp(self._selected_profile)
        else:
            # Apply profile from the last hold i.e the dict's "tail"
            last_cookie_key = list(self._holds)[-1]
            new_profile = self._holds[last_cookie_key]["profile"]
            new_appid = self._holds[last_cookie_key]["application_id"]
            old_profile = self._active_profile
            if new_profile != old_profile:
                self._logger.info(
                    f"Changing active profile: from hold '{old_profile}' to '{new_profile}' (appid: '{new_appid}')'"
                )
                self._apply_profile_with_tlp(new_profile)
            else:
                self._logger.info(
                    f"Keeping active profile: hold '{new_profile}' (appid: '{new_appid}')"
                )

    def _drop_all_holds(self):
        # Release all holds and notify holders
        _method = _method_name()
        self._logger.debug(f"{_method}()")

        cookies2release = list(self._holds.keys())
        for cookie in cookies2release:
            hold2release = self._holds.pop(cookie)
            self._logger.info(
                f"Auto-releasing hold: profile '{hold2release['profile']}' (appid: '{hold2release['application_id']}')'"
            )

            # Emit ProfileReleased signal to the sender
            ### TODO: Signal is not emitted
            self.ProfileReleased(dbus.UInt32(cookie))

    def _get_active_profile_holds(self):
        # Get list of active profile holds
        active_holds = []
        for cookie, hold_info in self._holds.items():
            active_holds.append(
                {
                    "ApplicationId": hold_info["application_id"],
                    "Profile": hold_info["profile"],
                    "Reason": hold_info["reason"],
                }
            )
        return dbus.Array(active_holds, signature="a{sv}")

    # --- TLP helpers
    def _apply_profile_with_tlp(self, profile):
        # Call tlp to apply the new profile
        if profile not in AVAILABLE_PROFILES:
            _method = _method_name()
            raise dbus.exceptions.DBusException(
                f"{_method}(): Invalid profile '{profile}'",
                name="org.freedesktop.DBus.Error.InvalidArgs",
            )

        # Run tlp <profile> in detached mode:
        # * Don't worry about the exit code,
        #   we'll get the result back later via SyncProfile()
        # * Avoids deadlocks in the event loop caused by tlp invoking
        #   SyncProfile()
        # Note: start_new_session=True detaches the process from the parent’s
        #   process group, but the parent is still responsible for reaping the
        #   child. Therefore auto-reap must be used (see __main__)
        self._logger.info(f"Run detached TLP to apply profile '{profile}'")
        _ = subprocess.Popen(
            ["tlp", f"{profile}"],
            start_new_session=True,
            stdin=subprocess.DEVNULL,
            stdout=subprocess.DEVNULL,
            stderr=subprocess.DEVNULL,
            close_fds=True,
        )

    def _get_tlp_profile(self):
        # Read TLP statefile and translate into the last profile applied
        _method = _method_name()

        try:
            with open(TLP_LAST_PROFILE, "r") as f:
                # statefile may contain two space separated values, get the first one only
                last = f.readline().strip().split()
                last_pp = last[0] if len(last) > 0 else None
        except (FileNotFoundError, IOError) as errmsg:
            self._logger.info(
                f"{_method}(): {errmsg} --> TLP may not have applied a profile yet."
            )
            return "balanced"

        if last_pp not in TLP_PROFILE_IDX:
            self._logger.warning(f"{_method}(): Unknown TLP profile '{last_pp}'")
            return "balanced"

        tlp_profile = AVAILABLE_PROFILES[int(last_pp)]
        self._logger.debug(f"{_method}(): Current TLP profile is '{tlp_profile}'")

        return tlp_profile

    # --- PolicyKit helpers
    def _check_polkit_auth(self, sender, action_id):
        # Check if the D-Bus sender is authorized for the given PolicyKit action.
        # Args:
        # - sender: D-Bus sender string (e.g., ":1.234")
        # - action_id: PolicyKit action identifier (e.g., "org.freedesktop.UPower.PowerProfiles.switch-profile")
        # Returns: bool: True if authorized, False otherwise
        sender = _sanitize_input(sender)
        action_id = _sanitize_input(action_id)

        # Get PolicyKit authority interface
        polkit = self._bus.get_object(
            "org.freedesktop.PolicyKit1", "/org/freedesktop/PolicyKit1/Authority"
        )
        authority = dbus.Interface(
            polkit, dbus_interface="org.freedesktop.PolicyKit1.Authority"
        )

        # Get the process ID and start time for the sender
        dbus_obj = self._bus.get_object("org.freedesktop.DBus", "/org/freedesktop/DBus")
        dbus_iface = dbus.Interface(dbus_obj, dbus_interface="org.freedesktop.DBus")
        pid = dbus_iface.GetConnectionUnixProcessID(sender)

        # Build the subject for PolicyKit check
        # Subject type: unix-process with PID and start-time
        subject = (
            "unix-process",
            {
                "pid": dbus.UInt32(pid),
                "start-time": dbus.UInt64(0),  # 0 means "don't check start time"
            },
        )

        # Check authorization
        try:
            result = authority.CheckAuthorization(
                subject,
                action_id,
                {},  # details
                dbus.UInt32(
                    0x1
                ),  # flags: AllowUserInteraction (show auth dialog if needed)
                "",  # cancellation_id
            )
            # result is a tuple: (is_authorized, is_challenge, details)

            is_authorized = result[0]
            return bool(is_authorized)

        except dbus.exceptions.DBusException as errmsg:
            _method = _method_name()
            self._logger.error(
                f"{_method}(): PolicyKit authorization check failed: {errmsg}"
            )
            return False


# --- Log helpers
def _method_name():
    # Return method name of caller
    return inspect.currentframe().f_back.f_code.co_name  # type: ignore[reportOptionalMemberAccess]


# --- Input sanitation
INPUT_ALLOWED_CHARS = (
    string.ascii_letters + string.digits + " !@'+-.,/:;_$&*()%<=>?#[]{|}^~" + '"'
)


def _sanitize_input(input_str, sani_ch="¿"):
    # Replaces any character not in NAMES_ALLOWED_CHARS with sani_char
    result_str = ""
    for ch in input_str:
        if ch in INPUT_ALLOWED_CHARS:
            result_str += ch
        else:
            result_str += sani_ch
    return result_str


# --- MAIN
def main():
    # Parse arguments
    parser = argparse.ArgumentParser(description=f"{DAEMON_NAME}")
    _ = parser.add_argument(
        "--debug", "-D", action="store_true", help="log debugging messages"
    )
    args = parser.parse_args()

    # Check root privileges
    if os.geteuid() != 0:
        print(
            f"Error: root privileges are required to run the {DAEMON_NAME}.",
            file=sys.stderr,
        )
        sys.exit(1)

    # Automatically reap tlp zombie subprocesses
    signal.signal(signal.SIGCHLD, signal.SIG_IGN)

    # --- Set up logging
    logger = logging.getLogger(__name__)
    handler = logging.handlers.SysLogHandler(address="/dev/log", facility=LOG_DAEMON)
    logger.addHandler(handler)

    # Set syslog identifier "tlp-pd"
    formatter = logging.Formatter(f"{os.path.basename(__file__)}: %(message)s")
    handler.setFormatter(formatter)

    # Set syslog level
    if args.debug:
        logger.setLevel(logging.DEBUG)
        loglevel = "debug"
    else:
        logger.setLevel(logging.INFO)
        loglevel = "info"

    # --- Initialize D-Bus
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

    # Connect to system bus
    bus = dbus.SystemBus()

    # Claim well-known names on the bus
    # Note: the name is released when the BusName object becomes unreferenced
    #   -> ignore "never used" linter warning
    bus_name = dbus.service.BusName(BUS_NAME, bus)  # noqa: F841
    bus_name_legacy = dbus.service.BusName(BUS_NAME_LEGACY, bus)  # noqa: F841

    # --- Create daemon instance
    daemon = ProfilesDaemon(bus, logger, loglevel)
    logger.info(f"{DAEMON_NAME} started.")
    logger.info(f"Interfaces: {BUS_NAME}, {BUS_NAME_LEGACY}")
    logger.info(f"Object path: {OBJECT_PATH}")
    logger.info(f"Arguments: {vars(args)}")
    logger.info(f"Loglevel: {loglevel}")
    logger.info(f"Initial profile: {daemon._active_profile}")

    # --- Run main loop
    try:
        mainloop = GLib.MainLoop()
        mainloop.run()
    except KeyboardInterrupt:
        logger.info(f"{DAEMON_NAME} shutting down ...")


if __name__ == "__main__":
    main()
